// 娑堟伅澶勭悊绾跨▼
package com.thread;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;

import com.config.CommandParse;
import com.databus.MultiPageManager;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.UninitializedMessageException;
import com.googlecode.protobuf.format.JsonFormat;
import com.databus.*;
import com.packet.PackageData;
import com.packet.PackageHeader;
import com.protobuf.MsgExpress;
import com.tool.ObjectConvertor;
import com.tool.ZlibUtils;

public class MsgLoopThread implements Runnable, TimeOutCallBack{
	
	public MsgLoopThread(MessageCallBack cb, BlockingQueue<PackageData> dataQueue, boolean bDeserialized)
	{
		mCallBack = cb;
		mPackDataQueue = dataQueue;
		mDeserialized = bDeserialized;

	}
	
	public MsgLoopThread(MessageCallBack cb, BlockingQueue<PackageData> dataQueue, TimeOutThread timeout, boolean bDeserialized)
	{
		mCallBack = cb;
		mPackDataQueue = dataQueue;
		mDeserialized = bDeserialized;

		if(null != timeout) 
		{
			mPageManager = new MultiPageManager(timeout);
		}
	}
	
	@Override
	public void OnMessageTimeOut(int serNum) 
	{
		if(mPageManager != null) 
		{
			mPageManager.OnMessageTimeOut(serNum);
		}
	}

	@Override
	public void run() {
		Log.info("Start MsgLoopThread Send Thread...");
		while(!Thread.interrupted())
		{
			try {
				PackageData packData = mPackDataQueue.take();
				if(mDeserialized 
						|| PackageHeader.MsgType.Publish == packData.mHeader.type 
						|| 0 == CommandParse.GetServiceId(packData.mHeader.command))
				{
					if(!DeserializedPackData(packData))
					{
						Log.error("DeserializedPackData failed!");
						continue;
					}
				}
				
				if(mDeserialized) {
					if(packData.mHeader.isMulti()) {
						if(packData.mHeader.getMultiPageNo() != 0 || packData.mMsg == null) {
							continue;
						}
					}
				}
				if(packData.mMsg instanceof MsgExpress.KickOffApp)
				{
					Log.error("app was kick off");
					mCallBack.OnConnectChange(MessageCallBack.EConnectStatus.ConnectStatus_Offline);
					break;
				}
				if(packData.mMsg instanceof MsgExpress.AppServerList)
				{
					MsgExpress.AppServerList appSvrList = (MsgExpress.AppServerList)packData.mMsg;
					if(appSvrList.getIsMasterSlaveMode())
					{
						Log.info("Master Or Slave Change, serverid = " + appSvrList.getServiceid());
						int addr = mCallBack.GetAddr();
						if(appSvrList.getAddrsCount() <= 1 || appSvrList.getAddrs(0) == addr)
						{
							Log.info("I am master server");
							mCallBack.OnConnectChange(MessageCallBack.EConnectStatus.ConnectStatus_Master);
						}
						else
						{
							Log.info("I am slave server");
							mCallBack.OnConnectChange(MessageCallBack.EConnectStatus.ConnectStatus_Slave);
						}
					}
					continue;
				}
				else if(packData.mMsg instanceof MsgExpress.RegisterService)
				{
					CommandParse.Register((MsgExpress.RegisterService)packData.mMsg);
					continue;
				}
				if(PackageHeader.MsgType.Request == packData.mHeader.type)
				{
					mCallBack.OnRequest(packData);
				}
				else if(PackageHeader.MsgType.Response == packData.mHeader.type)
				{
					mCallBack.OnResponse(packData);
				}
				else if(PackageHeader.MsgType.Publish == packData.mHeader.type)
				{
					mCallBack.OnPublish(packData);
				}
			} 
			catch(InterruptedException e){
				break;
			}
			catch (Exception e) {
				e.printStackTrace();
			}
		}
		Log.info("End MsgLoopThread Send Thread...");
	}

	// 鍙嶅簭鍒楀寲protobuf
	private Object getMsg(String classname,byte[] data,short protocol)
	{
		if(null == classname || classname.isEmpty())
		    return null;
		Object msg = null;
    	Method m;
		try {
			Class clazz=null;
			if(loader!=null)
				clazz=loader.loadClass(classname);
			if(clazz==null)
				clazz= Class.forName(classname);
			if(clazz!=null) {
				if (protocol == MsgExpress.SerializeType.PROTOBUF_VALUE) {
					m = clazz.getMethod("parseFrom", new Class[]{byte[].class});
					msg = (GeneratedMessage) m.invoke(clazz, new Object[]{data});
				} else if (protocol == MsgExpress.SerializeType.JSON_VALUE) {
					if(GeneratedMessage.class.isAssignableFrom(clazz))
					    msg = ObjectConvertor.JsonToProtobuf(new String(data), clazz);
					else
						msg = ObjectConvertor.JsonToBean(new String(data), clazz);
				} else {
					Log.error("To be implemented");
				}
			}
			else
				Log.error("Load class failed,classname:"+clazz);
		} catch (Exception e) {
			e.printStackTrace();
			Log.error(e);
			msg= MsgExpress.ErrMessage.newBuilder().setErrcode(1).setErrmsg(e.toString()).build();
		}
		return msg;
	}

	private boolean DeserializedPackData(PackageData pd)
	{
		if(null == pd)
		{
			Log.error("PackageData is null");
			return false;
		}
		
		if(pd.mHeader.IsZip())
		{
			pd.mData = ZlibUtils.decompress(pd.mData);
		}
		
		if(pd.mHeader.IsExt())
		{
			ByteBuffer temp = ByteBuffer.wrap(pd.mData);
			
			short extSize = temp.getShort();
			int classNameSize = temp.get() & 0xff;
			
			byte[] names = new byte[classNameSize];
			temp.get(names);
			
			byte[] extData = new byte[extSize - classNameSize - 3];
			temp.get(extData);
			
			byte[] data = new byte[pd.mData.length - extSize];
			temp.get(data);
			pd.mData = data;
			
			String clazz = "com.protobuf." + new String(names).replace('.', '$');
			pd.mExtMsg = getMsg(clazz, extData,(short)0);
		}
		
		if(mPageManager != null) {
			pd = mPageManager.addPackageData(pd);
			if(pd == null) {
				return true;
			}
		}
		
		String msgName = CommandParse.GetClass(pd.mHeader.command);
		if(null == msgName)
		{
			Log.error("Cannot get msg name, type = " + pd.mHeader.type + " cmd = " + pd.mHeader.command);
			return false;
		}
		short protocol=pd.mHeader.GetProtocol();
		Object msg = getMsg(msgName, pd.mData,protocol);
		if(null == msg)
		{
			Log.error("get message failed, msgName = " + msgName);
			return false;
		}
		pd.mMsg = msg;
		
		return true;
	}

	public void setClassLoader(ClassLoader loader)
	{
		this.loader=loader;
	}
	private boolean mDeserialized;
	private MessageCallBack mCallBack;
	private BlockingQueue<PackageData> mPackDataQueue;
	private MultiPageManager mPageManager = null;
    private ClassLoader loader=null;
}
